// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package storage

import (
	"time"

	"github.com/robfig/cron/v3"

	"github.com/apache/skywalking-banyandb/pkg/logger"
)

var (
	creationGap           = time.Hour
	newSegmentTimeGap     = creationGap.Nanoseconds()
	timeEventSnapDuration = (10 * time.Minute).Nanoseconds()
)

func (d *database[T, O]) Tick(ts int64) {
	if (ts - timeEventSnapDuration) < d.latestTickTime.Load() {
		return
	}
	d.latestTickTime.Store(ts)
	select {
	case d.tsEventCh <- ts:
	default:
	}
}

func (d *database[T, O]) startRotationTask() error {
	options := d.segmentController.getOptions()
	var rt *retentionTask[T, O]
	if !d.disableRetention {
		rt = newRetentionTask(d, options.TTL)
	}
	go func(rt *retentionTask[T, O]) {
		var idleCheckTicker *time.Ticker
		var idleCheckC <-chan time.Time

		// Only create the ticker if idleTimeout is at least 1 second
		if d.segmentController.idleTimeout >= time.Second {
			idleCheckTicker = time.NewTicker(10 * time.Minute)
			idleCheckC = idleCheckTicker.C
			defer func() {
				if idleCheckTicker != nil {
					idleCheckTicker.Stop()
				}
			}()
		}

		for {
			select {
			case ts, ok := <-d.tsEventCh:
				if !ok {
					d.logger.Debug().Msg("tsEventCh closed")
					return
				}
				func(ts int64) {
					d.rotationProcessOn.Store(true)
					defer d.rotationProcessOn.Store(false)
					t := time.Unix(0, ts)
					if rt != nil {
						rt.run(t, d.logger)
					}
					func() {
						ss, err := d.segmentController.segments(true) // Ensure segments are open
						if err != nil {
							d.logger.Error().Err(err).Msg("failed to get segments")
							return
						}
						if len(ss) == 0 {
							return
						}
						defer func() {
							for i := 0; i < len(ss); i++ {
								ss[i].DecRef()
							}
						}()
						for i := range ss {
							if ss[i].End.UnixNano() < ts {
								ss[i].index.store.Reset()
							}
						}
						latest := ss[len(ss)-1]
						gap := latest.End.UnixNano() - ts
						// gap <=0 means the event is from the future
						// the segment will be created by a written event directly
						if gap <= 0 || gap > newSegmentTimeGap {
							return
						}
						d.incTotalRotationStarted(1)
						defer d.incTotalRotationFinished(1)
						start := options.SegmentInterval.NextTime(t)
						d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment")
						_, err = d.segmentController.create(start)
						if err != nil {
							d.logger.Error().Err(err).Msgf("failed to create new segment.")
							d.incTotalRotationErr(1)
						}
					}()
				}(ts)
			case <-idleCheckC:
				func() {
					d.logger.Debug().Msg("checking for idle segments")
					closedCount := d.segmentController.closeIdleSegments()
					if closedCount > 0 {
						d.logger.Info().Int("count", closedCount).Msg("closed idle segments")
					}
				}()
			}
		}
	}(rt)
	if rt == nil {
		return nil
	}
	return d.scheduler.Register("retention", rt.option, rt.expr, rt.run)
}

type retentionTask[T TSTable, O any] struct {
	database *database[T, O]
	running  chan struct{}
	expr     string
	option   cron.ParseOption
	duration time.Duration
}

func newRetentionTask[T TSTable, O any](database *database[T, O], ttl IntervalRule) *retentionTask[T, O] {
	return &retentionTask[T, O]{
		database: database,
		option:   cron.Minute | cron.Hour,
		// Remove data which is
		expr:     "5 0",
		duration: ttl.estimatedDuration(),
		running:  make(chan struct{}, 1),
	}
}

func (rc *retentionTask[T, O]) run(now time.Time, l *logger.Logger) bool {
	select {
	case rc.running <- struct{}{}:
	default:
		return true
	}
	defer func() {
		<-rc.running
	}()

	rc.database.incTotalRetentionStarted(1)
	defer rc.database.incTotalRetentionFinished(1)
	deadline := now.Add(-rc.duration)
	start := time.Now()
	hasData, err := rc.database.segmentController.remove(deadline)
	if hasData {
		rc.database.incTotalRetentionHasData(1)
		rc.database.incTotalRetentionHasDataLatency(time.Since(start).Seconds())
	}
	if err != nil {
		l.Error().Err(err)
		rc.database.incTotalRetentionErr(1)
	}
	return true
}
